PySpark 搭建ALS协同过滤模型

协同过滤是推荐系统中常用的一种机制,利用用户对于内容的评分搭建交叉的用户偏好以推荐相关内容。其中评分具有多样性,需要根据具体的业务和数据来确认可用的评分信息。这种评分信息可能是用户直接**评分,也可能是间接*从其他数据中提取。在本次落实该项目,基于仅有的少量数据搭建一种推荐机制。

协同过滤在 IPTV 下应用

包括使用矩阵分解方法解析用户-内容矩阵,一种方式显示的用户对内容的评分,第二种方式隐式的方式分析用户的行为动作强度来分析(eg: 利用用户点击次数或者用户观看电影的累积时长)。

在应用中通过显性反馈数据信息进行推荐是一种广泛且直观的方法,因为很容易对显性反馈数据进行解析。非显性的反馈数据信息是对缺少显式反馈数据搭建推荐的良好补充。TV 端推荐引擎反馈数据特点:

  1. 用户非负反馈,用户的非负行为反馈较难直接推断用户的不喜欢行为,而且在推荐应用上因为数据分析处理过程中的“缺失化处理”(对用户没有交互的内容采取缺失处理)。因此关注于这类缺失数据中负反馈是有重要意义的
  2. 显式反馈的天然噪声,显示的反馈很容易被关联上用户偏好和真实动机,但是这种方式存在潜在的问题——例如用户购买后,对产品的态度并不存在天然关联。而在 TV 上用户长时间观看记录,并不一定真实在观看内容
  3. 显性反馈数值指示偏好信息,隐形反馈反馈数值指示置信度。单次事件发送可能是多种原因,但对于重复发生事件能够在一定层度上反映用户“意见”
  4. 隐式反馈推荐评估方法的转变,传统的显式反馈可以通过数值直接反映预测效果,但隐式模型推荐需要考虑到待推荐内容的可用性和其他内容的竞争

模型搭建

本次的应用过程中基于 ALS 算法进行矩阵分解的方式搭建协同过滤推荐系统,落实过程和一般的模型过程一致,而重点在于搭建模型前的评分确认。经过业务沟通、现有数据采集方案和其他内容参考敲定了相关的评分计算模式:$评分= 时长权重观看时长得分 + 次数权重观看次数得分$。

评分计算

计算用户对内容评分过程包括了两个部分,观看时长和观看次数两个部分。两者的得分上分别是单一影片的统计数据占所有影片的内容统计数据,最终的详细计算方案为: $评分=时长权重 * \frac{该影片观看时长}{该用户总观看时长}+次数权重*\frac{该影片观看次数}{该用户总观看次数}$。

>unfold
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 评分计算方法一:根据用户和内容的交互数据统计出播放时长和点播次数
user2item = vod.groupby("itv_account", "节目名称") \
.agg(F.sum(F.col("duration")).alias("span"), F.count(F.column("duration")).alias("frequency"))


# 评分计算方法二:下面注释的方法是使用 collect_list 去将时间聚合
# user2item = vod.groupby("itv_account", "节目名称") \
# .agg(F.collect_list(F.col("duration")).alias("durations"))

# 根据用户数据,统计出各用户交互信息
users = vod.groupby("itv_account") \
.agg(F.sum(F.col("duration")).alias("span_"), F.count(F.column("duration")).alias("frequency_"))

# 计算评分,这里权重
scores = user2item.join(users, on="itv_account") \
.select("itv_account", "节目名称",
(F.col("span") / F.col("span_") + F.col("frequency") / F.col("frequency_")).alias("rate"))

# 构建评分矩阵——因 IPTV 用户观看内容存在重复观看,所以评分统计最后采取累加的方案
data = scores.groupby("itv_account").pivot("节目名称").sum("rate")

模型训练数据拆分

同其他监督学习模型一样,需要拆分为训练数据和测试数据。

>unfold
1
2
3
4
5
6
7
8
9
10
11
from pyspark.ml import evaluation, recommendation
from pyspark.ml import feature, pipeline

# 为方便选择相关字段,在后续处理之前先将特征进行处理,包括名称
userStringIndexer = feature.StringIndexer(inputCol="itv_account" , outputCol="uid")
contentStringIndexer = feature.StringIndexer(inputCol="节目名称" , outputCol="cid")
pipe = pipeline.Pipeline(stages=[userStringIndexer, contentStringIndexer])
transformed_scores = pipe.fit(scores).transform(scores)

# 拆分数据集
(train, test) = transformed_scores.randomSplit([.8, .2], 42)

模型训练

使用 recommendation 模块中的 ALS 实现模型搭建,对于冷启动策略使用 drop 方案:

>unfold
1
2
3
4
5
6
7
# 搭建模型超参数
als = recommendation.ALS(
rank=20, maxIter=10, userCol="uid", itemCol="cid", ratingCol="rate",
numUserBlocks=4, numItemBlocks=4, alpha=1., implicitPrefs=False, coldStartStrategy="drop"
)

model = als.fit(train)

模型评估

因该模型解决的是回归问题,搭建过程中使用了 RMSE 来验证模型:

>unfold
1
2
3
4
5
6
7
8
9
# 利用测试数据验证结果
predictions = model.transform(test)

# 评估器
rmse_evaluator = evaluation.RegressionEvaluator(predictionCol='prediction', labelCol="rate")
print("测试数据集结果 RMSE: %s" % rmse_evaluator.evaluate(predictions))

# 显示结果
predictions.head(3)
预测结果

优化与部署

部署方面,该模型可以得到对于基于用户历史行为信息的协同过滤模型,结果可以得到不同用户的推荐内容,以此作为推荐 candidates。
上述基于 ALS 算法的方式得到了一个模型,但并没有进行详细的优化——可以调整rank,numIterations,lambda,alpha 这些参数,与基准结果比较判断结果。所以优化方面,对参数优化达到结果较优模型和推荐解雇。另一方面该模型是矩阵分解的算法,其副产品是用户和内容的矩阵。可以在后续的模型中应用用户矩阵作为用户特征。

作者

ZenRay

发布于

2021-02-01

更新于

2021-04-11

许可协议

CC BY-NC-SA 4.0